package com.xplay.xpocker.socket;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.xplay.xpocker.business.BusinessEnum;
import com.xplay.xpocker.business.BusinessException;
import com.xplay.xpocker.meta.room.MetaRoomEntity;
import com.xplay.xpocker.message.action.IMessageHandler;
import com.xplay.xpocker.message.action.MessageHandlerContext;
import com.xplay.xpocker.meta.MessageContent;
import com.xplay.xpocker.message.connecter.ConnectHandlerContext;
import com.xplay.xpocker.observer.SubscriptionSubject;
import com.xplay.xpocker.service.mahjong.IRoomService;
import com.xplay.xpocker.util.*;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

@Component
@ChannelHandler.Sharable
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> implements UserChannelMate {


    private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);

    private WebSocketServerHandshaker handshaker;


    @Autowired
    @Resource
    private MessageHandlerContext messageHandlerContext;

    @Autowired
    @Resource
    private ConnectHandlerContext connectHandlerContext;

    @Autowired
    @Qualifier(value = "roomActionLock")
    ConcurrentHashMap<String, ReentrantLock> roomActionLock;

    @Autowired
    private IRoomService roomService;


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
    }

    /**
     * 加入心跳检查机制   当用户不再发送心跳的时候 也将当前用户移除
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        if (evt instanceof IdleStateEvent) {
            if (event.state() == IdleState.READER_IDLE) {
                userOnline(getRoomCode(ctx), getUserId(ctx), false, true);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    public void userOnline(String roomCode, String userId, boolean online, boolean addLock) {
        ReentrantLock roomLock = roomActionLock.get(roomCode);
        try {
            if (roomLock == null) {
                return;
            }
            if (addLock) {
                roomLock.lock();
            }
            // 通知 当前桌的人   离线了
            roomService.roomUserOnline(roomCode, userId, online);
            SubscriptionSubject subject = diskPart.get(roomCode);
            if (subject == null) {
                return;
            }
            subject.notifyMessage(new MessageContent<String>().initActionAndUId(online ? DictionaryConst.UserActionConst.ON_LINE : DictionaryConst.UserActionConst.OFF_LINE, userId), userId);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (roomLock != null && addLock) {
                roomLock.unlock();
            }
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //首次请求之后先进行握手，通过http请求来实现
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            handleWebSocketRequest(ctx, (WebSocketFrame) msg);
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        ReentrantLock roomLock = null;
        try {
            //解码http失败返回
            if (!request.decoderResult().isSuccess()) {
                sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer()));
                return;
            }

            if (!HttpMethod.GET.equals(request.method())) {
                sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer()));
                return;
            }
            String uri = request.uri();
            QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
            Map<String, List<String>> parameters = queryStringDecoder.parameters();
            String token = parameters.get(JwtUtils.TOKEN_HEADER).get(0);
            // 当 握手成功的时候  调用策略 模式的连接器
            String roomCode = parameters.get(ROOM_CODE_KEY).get(0);
            log.info("-------user-------connect----{}", roomCode);
            roomLock = roomActionLock.get(roomCode);
            log.info("-------user-------connect-------lock");
            roomLock.lock();
            BusinessAssertion.isTrue(BusinessEnum.ROOM_NOT_FOUND, !JwtUtils.validateToken(token));
            log.info("-------user---key[{}]----connect", token);
            String userId = JwtUtils.getAllClaimsFromToken(token).getId();
            ctx.channel().attr(AttributeKey.valueOf(USER_CHANNEL_KEY)).set(userId);
            log.info("-------user-------connect-------setparam");
            //参数分别是ws地址，子协议，是否扩展，最大frame长度
            WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, true, 5 * 1024 * 1024);
            handshaker = factory.newHandshaker(request);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                handshaker.handshake(ctx.channel(), request);
            }
            log.info("-------user-------connect-------success");
            ctx.channel().attr(AttributeKey.valueOf(ROOM_CODE_KEY)).set(roomCode);
            MetaRoomEntity roomEntity = roomService.queryRoomInfo(roomCode);
            BusinessAssertion.isNull(BusinessEnum.ROOM_NOT_FOUND, roomEntity);
            ctx.channel().attr(AttributeKey.valueOf(ROOM_TYPE_KEY)).set(roomEntity.getRoomType());
            // 进行具体的连接业务数据处理  比如添加玩家序列号  添加庄家   一系列准备工作
            connectHandlerContext.getInstance(roomEntity.getRoomType()).doConnect(ctx, roomCode);
            // 将用户状态设置为在线
            userOnline(getRoomCode(ctx), getUserId(ctx), true, false);
            log.info("-------user-------connect-------success------setother---info");
        } catch (BusinessException e) {
            e.printStackTrace();
            ctx.writeAndFlush(new TextWebSocketFrame(JacksonStringUtil.objectToString(new MessageContent(e.getMessage(), null, null, e.getStateCode()), false)));
            ctx.channel().close();
            logger.info(" channel 非法 加入");
        } finally {
            if (roomLock != null) {
                roomLock.unlock();
            }
        }
    }


    //SSL支持采用wss://
    private String getWebSocketLocation(FullHttpRequest request) {
        String location = request.headers().get(HttpHeaderNames.HOST) + "/websocket";
        return "ws://" + location;
    }

    private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
        //关闭
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        //握手 PING/PONG
        if (frame instanceof PingWebSocketFrame) {
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        MessageContent messageContent = null;
        //文本接收和发送
        if (frame instanceof TextWebSocketFrame) {
            String recv = ((TextWebSocketFrame) frame).text();
            if (recv.equals("===5oiR55qE5b+D6Lez===")) {
                log.info("========client heart==========={}", getUserId(ctx));
                return;
            }
            ReentrantLock roomLock = roomActionLock.get(getRoomCode(ctx));
            try {
                BusinessAssertion.isNull(BusinessEnum.ROOM_NOT_FOUND, roomLock);
                roomLock.lock();
                messageContent = JacksonStringUtil.objectCase(recv, MessageContent.class);
                // 麻将类  扑克类   等等.....这里最好做一个消息分发   大类 子类
                IMessageHandler messageHandler = messageHandlerContext.getInstance(ctx.channel().attr(AttributeKey.valueOf(ROOM_TYPE_KEY)).toString(), messageContent.getAction());
                messageContent.setUserId(getUserId(ctx));
                messageContent.setRoomCode(getRoomCode(ctx));
                messageHandler.handler(ctx, messageContent);
            } catch (BusinessException e) {
                ctx.writeAndFlush(new TextWebSocketFrame(JacksonStringUtil.objectToString(new MessageContent(e.getMessage(), null, null, e.getStateCode()), false)));
            } finally {
                if (roomLock != null) {
                    roomLock.unlock();
                }
            }

            return;
        }

        if (frame instanceof BinaryWebSocketFrame) {
            ctx.write(frame.retain());
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        // 当连接主动断开的时候 找到房间  将用户的状态改为离线  并发送 离线消息给每一个用户
        // 因为网络原因造成的连接断开 这里是无法监听的   所以需要客户端定时发送心跳给服务器
        log.info("=================session is close================= ");
        userOnline(getRoomCode(ctx), getUserId(ctx), false, true);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse resp) {
        HttpResponseStatus status = resp.status();
        if (status != HttpResponseStatus.OK) {
            ByteBufUtil.writeUtf8(resp.content(), status.toString());
            HttpUtil.setContentLength(req, resp.content().readableBytes());
        }
        boolean keepAlive = HttpUtil.isKeepAlive(req) && status == HttpResponseStatus.OK;
        HttpUtil.setKeepAlive(req, keepAlive);
        ChannelFuture future = ctx.write(resp);
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

